{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Custom Estimator with Keras\n",
    "\n",
    "**Learning Objectives**\n",
    "- Learn how to create custom estimator using tf.keras\n",
    "    \n",
    "## Introduction \n",
    "Up until now we've been limited in our model architectures to premade estimators. But what if we want more control over the model? We can use the popular Keras API to create a custom model. Keras is a high-level API to build and train deep learning models. It is user-friendly, modular and makes writing custom building blocks of Tensorflow code much easier. \n",
    "\n",
    "Once we've build a Keras model we then it to an estimator using `tf.keras.estimator.model_to_estimator()`This gives us access to all the flexibility of Keras for creating deep learning models, but also the production readiness of the estimator framework!"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import tensorflow as tf\n",
    "import numpy as np\n",
    "import shutil\n",
    "print(tf.__version__)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Train and Evaluate input functions\n",
    "\n",
    "For the most part, we can use the same train and evaluation input functions that we had in previous labs. Note the function `create_feature_keras_input` below. We will use this to create the first layer of the model. This function is called in turn during the `train_input_fn` and `eval_input_fn` as well."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "CSV_COLUMN_NAMES = [\"fare_amount\",\"dayofweek\",\"hourofday\",\"pickuplon\",\"pickuplat\",\"dropofflon\",\"dropofflat\"]\n",
    "CSV_DEFAULTS = [[0.0],[1],[0],[-74.0], [40.0], [-74.0], [40.7]]\n",
    "\n",
    "def read_dataset(csv_path):\n",
    "    def parse_row(row):\n",
    "        # Decode the CSV row into list of TF tensors\n",
    "        fields = tf.decode_csv(records = row, record_defaults = CSV_DEFAULTS)\n",
    "\n",
    "        # Pack the result into a dictionary\n",
    "        features = dict(zip(CSV_COLUMN_NAMES, fields))\n",
    "        \n",
    "        # NEW: Add engineered features\n",
    "        features = add_engineered_features(features)\n",
    "        \n",
    "        # Separate the label from the features\n",
    "        label = features.pop(\"fare_amount\") # remove label from features and store\n",
    "\n",
    "        return features, label\n",
    "    \n",
    "    # Create a dataset containing the text lines.\n",
    "    dataset = tf.data.Dataset.list_files(file_pattern = csv_path) # (i.e. data_file_*.csv)\n",
    "    dataset = dataset.flat_map(map_func = lambda filename: tf.data.TextLineDataset(filenames = filename).skip(count = 1))\n",
    "\n",
    "    # Parse each CSV row into correct (features,label) format for Estimator API\n",
    "    dataset = dataset.map(map_func = parse_row)\n",
    "    \n",
    "    return dataset\n",
    "  \n",
    "def create_feature_keras_input(features, label):\n",
    "    features = tf.feature_column.input_layer(features = features, feature_columns = create_feature_columns())\n",
    "    return features, label\n",
    "\n",
    "def train_input_fn(csv_path, batch_size = 128):\n",
    "    #1. Convert CSV into tf.data.Dataset with (features, label) format\n",
    "    dataset = read_dataset(csv_path)\n",
    "      \n",
    "    #2. Shuffle, repeat, and batch the examples.\n",
    "    dataset = dataset.shuffle(buffer_size = 1000).repeat(count = None).batch(batch_size = batch_size)\n",
    "    \n",
    "    #3. Create single feature tensor for input to Keras Model\n",
    "    dataset = dataset.map(map_func = create_feature_keras_input)\n",
    "   \n",
    "    return dataset\n",
    "\n",
    "def eval_input_fn(csv_path, batch_size = 128):\n",
    "    #1. Convert CSV into tf.data.Dataset with (features, label) format\n",
    "    dataset = read_dataset(csv_path)\n",
    "\n",
    "    #2.Batch the examples.\n",
    "    dataset = dataset.batch(batch_size = batch_size)\n",
    "    \n",
    "    #3. Create single feature tensor for input to Keras Model\n",
    "    dataset = dataset.map(map_func = create_feature_keras_input)\n",
    "   \n",
    "    return dataset"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Feature Engineering\n",
    "\n",
    "We'll use the same engineered features that we had in previous labs."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def add_engineered_features(features):\n",
    "    features[\"dayofweek\"] = features[\"dayofweek\"] - 1 # subtract one since our days of week are 1-7 instead of 0-6\n",
    "    \n",
    "    features[\"latdiff\"] = features[\"pickuplat\"] - features[\"dropofflat\"] # East/West\n",
    "    features[\"londiff\"] = features[\"pickuplon\"] - features[\"dropofflon\"] # North/South\n",
    "    features[\"euclidean_dist\"] = tf.sqrt(x = features[\"latdiff\"]**2 + features[\"londiff\"]**2)\n",
    "\n",
    "    return features"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def create_feature_columns():\n",
    "    # One hot encode dayofweek and hourofday\n",
    "    fc_dayofweek = tf.feature_column.categorical_column_with_identity(key = \"dayofweek\", num_buckets = 7)\n",
    "    fc_hourofday = tf.feature_column.categorical_column_with_identity(key = \"hourofday\", num_buckets = 24)\n",
    "\n",
    "    # Cross features to get combination of day and hour\n",
    "    fc_day_hr = tf.feature_column.crossed_column(keys = [fc_dayofweek, fc_hourofday], hash_bucket_size = 24 * 7)\n",
    "\n",
    "    # Bucketize latitudes and longitudes\n",
    "    NBUCKETS = 16\n",
    "    latbuckets = np.linspace(start = 38.0, stop = 42.0, num = NBUCKETS).tolist()\n",
    "    lonbuckets = np.linspace(start = -76.0, stop = -72.0, num = NBUCKETS).tolist()\n",
    "    fc_bucketized_plat = tf.feature_column.bucketized_column(source_column = tf.feature_column.numeric_column(key = \"pickuplon\"), boundaries = lonbuckets)\n",
    "    fc_bucketized_plon = tf.feature_column.bucketized_column(source_column = tf.feature_column.numeric_column(key = \"pickuplat\"), boundaries = latbuckets)\n",
    "    fc_bucketized_dlat = tf.feature_column.bucketized_column(source_column = tf.feature_column.numeric_column(key = \"dropofflon\"), boundaries = lonbuckets)\n",
    "    fc_bucketized_dlon = tf.feature_column.bucketized_column(source_column = tf.feature_column.numeric_column(key = \"dropofflat\"), boundaries = latbuckets)\n",
    "\n",
    "    feature_columns = [\n",
    "        #1. Engineered using tf.feature_column module\n",
    "        tf.feature_column.indicator_column(categorical_column = fc_day_hr), # 168 columns\n",
    "        fc_bucketized_plat, # 16 + 1 = 17 columns\n",
    "        fc_bucketized_plon, # 16 + 1 = 17 columns\n",
    "        fc_bucketized_dlat, # 16 + 1 = 17 columns\n",
    "        fc_bucketized_dlon, # 16 + 1 = 17 columns\n",
    "        #2. Engineered in input functions\n",
    "        tf.feature_column.numeric_column(key = \"latdiff\"), # 1 column\n",
    "        tf.feature_column.numeric_column(key = \"londiff\"), # 1 column\n",
    "        tf.feature_column.numeric_column(key = \"euclidean_dist\") # 1 column\n",
    "    ]\n",
    "  \n",
    "    return feature_columns"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Calculate the number of feature columns that will be input to our Keras model"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "num_feature_columns = 168 + (16 + 1) * 4 + 3\n",
    "print(\"num_feature_columns = {}\".format(num_feature_columns))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Build Custom Keras Model\n",
    "\n",
    "Now we can begin building our Keras model. Have a look at [the guide here](https://www.tensorflow.org/guide/keras) to see more explanation."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def create_keras_model():\n",
    "    model = tf.keras.Sequential()\n",
    "    model.add(tf.keras.layers.InputLayer(input_shape = (num_feature_columns,), name = \"dense_input\"))\n",
    "    model.add(tf.keras.layers.Dense(units = 64, activation = \"relu\", name = \"dense0\"))\n",
    "    model.add(tf.keras.layers.Dense(units = 64, activation = \"relu\", name = \"dense1\"))\n",
    "    model.add(tf.keras.layers.Dense(units = 64, activation = \"relu\", name = \"dense2\"))\n",
    "    model.add(tf.keras.layers.Dense(units = 64, activation = \"relu\", name = \"dense3\"))\n",
    "    model.add(tf.keras.layers.Dense(units = 8, activation = \"relu\", name = \"dense4\"))\n",
    "    model.add(tf.keras.layers.Dense(units = 1, activation = None, name = \"logits\"))\n",
    "\n",
    "    def rmse(y_true, y_pred): # Root Mean Squared Error\n",
    "        return tf.sqrt(x = tf.reduce_mean(input_tensor = tf.square(x = y_pred - y_true)))\n",
    "\n",
    "    model.compile(\n",
    "        optimizer = tf.train.AdamOptimizer(),\n",
    "        loss = \"mean_squared_error\",\n",
    "        metrics = [rmse])\n",
    "  \n",
    "    return model"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Serving input function\n",
    "\n",
    "Once we've constructed our model in Keras, we next create the serving input function. This is also similar to what we have done in previous labs. Note that we use our `create_feature_keras_input` function again so that we perform our feature engineering during inference."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Create serving input function\n",
    "def serving_input_fn():\n",
    "    feature_placeholders = {\n",
    "        \"dayofweek\": tf.placeholder(dtype = tf.int32, shape = [None]),\n",
    "        \"hourofday\": tf.placeholder(dtype = tf.int32, shape = [None]),\n",
    "        \"pickuplon\": tf.placeholder(dtype = tf.float32, shape = [None]),\n",
    "        \"pickuplat\": tf.placeholder(dtype = tf.float32, shape = [None]),\n",
    "        \"dropofflon\": tf.placeholder(dtype = tf.float32, shape = [None]),\n",
    "        \"dropofflat\": tf.placeholder(dtype = tf.float32, shape = [None]),\n",
    "    }\n",
    "\n",
    "    features = {key: tensor for key, tensor in feature_placeholders.items()}\n",
    "\n",
    "    # Perform our feature engineering during inference as well\n",
    "    features, _ = create_feature_keras_input((add_engineered_features(features)), None)\n",
    "\n",
    "    return tf.estimator.export.ServingInputReceiver(features = {\"dense_input\": features}, receiver_tensors = feature_placeholders)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Train and Evaluate\n",
    "\n",
    "To train our model, we can use `train_and_evaluate` as we have before. Note that we use `tf.keras.estimator.model_to_estimator` to create our estimator. It takes as arguments the compiled keras model, the OUTDIR, and optionally a `tf.estimator.Runconfig`. Have a look at [the documentation for tf.keras.estimator.model_to_estimator](https://www.tensorflow.org/api_docs/python/tf/keras/estimator/model_to_estimator) to make sure you understand how arguments are used. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def train_and_evaluate(output_dir):\n",
    "    tf.logging.set_verbosity(v = tf.logging.INFO) # so loss is printed during training\n",
    "        \n",
    "    estimator = tf.keras.estimator.model_to_estimator(\n",
    "        keras_model = create_keras_model(),\n",
    "        model_dir = output_dir,\n",
    "        config = tf.estimator.RunConfig(\n",
    "            tf_random_seed = 1, # for reproducibility\n",
    "            save_checkpoints_steps = 100 # checkpoint every N steps\n",
    "        )\n",
    "    )\n",
    "\n",
    "    train_spec = tf.estimator.TrainSpec(\n",
    "        input_fn = lambda: train_input_fn(csv_path = \"./taxi-train.csv\"),\n",
    "        max_steps = 500)\n",
    "\n",
    "    exporter = tf.estimator.LatestExporter(name = 'exporter', serving_input_receiver_fn = serving_input_fn)\n",
    "\n",
    "    eval_spec = tf.estimator.EvalSpec(\n",
    "        input_fn = lambda: eval_input_fn(csv_path = \"./taxi-valid.csv\"),\n",
    "        steps = None,\n",
    "        start_delay_secs = 10, # wait at least N seconds before first evaluation (default 120)\n",
    "        throttle_secs = 10, # wait at least N seconds before each subsequent evaluation (default 600)\n",
    "        exporters = exporter) # export SavedModel once at the end of training\n",
    "\n",
    "    tf.estimator.train_and_evaluate(\n",
    "        estimator = estimator, \n",
    "        train_spec = train_spec, \n",
    "        eval_spec = eval_spec)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "OUTDIR = \"taxi_trained\"\n",
    "shutil.rmtree(path = OUTDIR, ignore_errors = True) # start fresh each time\n",
    "tf.summary.FileWriterCache.clear() # ensure filewriter cache is clear for TensorBoard events file\n",
    "\n",
    "train_and_evaluate(OUTDIR)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Copyright 2019 Google Inc. Licensed under the Apache License, Version 2.0 (the \"License\"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.4"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
